/*
 *  +----------------------------------------------------------------------
 *  | sophon [ A FAST GAME FRAMEWORK ]
 *  +----------------------------------------------------------------------
 *  | Copyright (c) 2023-2029 All rights reserved.
 *  +----------------------------------------------------------------------
 *  | Licensed ( http:www.apache.org/licenses/LICENSE-2.0 )
 *  +----------------------------------------------------------------------
 *  | Author: jqiris <1920624985@qq.com>
 *  +----------------------------------------------------------------------
 */

use std::time::Duration;

use anyhow::{anyhow, Result};
use prost::Message;
use redis::{self, Commands, ToRedisArgs};
use serde::{de::DeserializeOwned, Serialize};

use crate::stores::redlock::{Lock, RedLock};
use crate::{
    configs::StoreConf,
    encoders::{decode_json_msg, decode_proto_msg, encode_json_msg, encode_proto_msg},
    logger::*,
};

pub struct StoreRedis {
    prefix: String,
    client: redis::Client,
    redlock: RedLock,
    dial_timeout: u64,
}

impl StoreRedis {
    pub fn new(cfg: StoreConf) -> Result<StoreRedis> {
        let (mut user, mut passwd) = ("".to_string(), "".to_string());
        if let Some(v) = cfg.auth_user {
            user = v;
        }
        if let Some(v) = cfg.auth_passwd {
            passwd = v;
        }
        let url = format!(
            "redis://{}:{}@{}:{}/{}",
            user, passwd, cfg.host, cfg.port, cfg.db,
        );
        let client = redis::Client::open(url.clone())?;
        //redlock
        let redlock = RedLock::new(vec![url]);
        let instance = Self {
            prefix: cfg.prefix,
            client,
            redlock,
            dial_timeout: cfg.dial_timeout,
        };
        Ok(instance)
    }

    pub fn get_conn(&self) -> Result<redis::Connection> {
        let conn = self.client.get_connection()?;
        conn.set_read_timeout(Some(Duration::from_secs(self.dial_timeout)))?;
        conn.set_write_timeout(Some(Duration::from_secs(self.dial_timeout)))?;
        Ok(conn)
    }

    pub fn get_key(&self, key: &str) -> String {
        format!("{}:{}", self.prefix, key)
    }
    pub fn get_keys(&self, keys: Vec<String>) -> Vec<String> {
        let mut list = Vec::new();
        for key in keys {
            let real_key = self.get_key(key.as_str());
            list.push(real_key);
        }
        list
    }
    pub fn set<T: Serialize + DeserializeOwned>(
        &mut self,
        key: &str,
        value: T,
        seconds: usize,
    ) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_json_msg(Some(value))?;
        conn.set_ex(self.get_key(key), data, seconds)?;
        Ok(())
    }
    pub fn set_nx<T: Serialize + DeserializeOwned>(&mut self, key: &str, value: T) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_json_msg(Some(value))?;
        conn.set_nx(self.get_key(key), data)?;
        Ok(())
    }
    pub fn set_proto<T: Message + Default>(
        &mut self,
        key: &str,
        value: T,
        seconds: usize,
    ) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_proto_msg(Some(value))?;
        conn.set_ex(self.get_key(key), data, seconds)?;
        Ok(())
    }
    pub fn get<T: Serialize + DeserializeOwned>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data: Vec<u8> = conn.get(self.get_key(key))?;
        decode_json_msg(data)
    }
    pub fn get_proto<T: Message + Default>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data: Vec<u8> = conn.get(self.get_key(key))?;
        decode_proto_msg(data)
    }
    pub fn del(&mut self, key: &str) -> Result<()> {
        let mut conn = self.get_conn()?;
        conn.del(self.get_key(key))?;
        Ok(())
    }
    pub fn del_keys(&mut self, keys: Vec<String>) -> Result<()> {
        let mut conn = self.get_conn()?;
        conn.del(self.get_keys(keys))?;
        Ok(())
    }
    pub fn del_pattern(&mut self, pattern: &str) -> Result<()> {
        let mut conn = self.get_conn()?;
        let keys: Vec<String> = conn.keys(self.get_key(pattern))?;
        if keys.len() > 0 {
            conn.del(keys)?;
        }
        Ok(())
    }
    pub fn exists(&mut self, key: &str) -> bool {
        match self.get_conn() {
            Ok(mut conn) => match conn.exists(self.get_key(key)) {
                Ok(v) => v,
                Err(_) => false,
            },
            Err(err) => {
                error!("get redis conn error: {}", err);
                false
            }
        }
    }

    pub fn hset<T: Serialize + DeserializeOwned>(
        &mut self,
        key: &str,
        field: String,
        value: T,
    ) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_json_msg(Some(value))?;
        conn.hset(self.get_key(key), field, data)?;
        Ok(())
    }
    pub fn hset_nx<T: Serialize + DeserializeOwned>(
        &mut self,
        key: &str,
        field: String,
        value: T,
    ) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_json_msg(Some(value))?;
        conn.hset_nx(self.get_key(key), field, data)?;
        Ok(())
    }

    pub fn hget<T: Serialize + DeserializeOwned>(&mut self, key: &str, field: String) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.hget(self.get_key(key), field)?;
        decode_json_msg(data)
    }

    pub fn hset_proto<T: Message + Default>(
        &mut self,
        key: &str,
        field: String,
        value: T,
    ) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_proto_msg(Some(value))?;
        conn.hset(self.get_key(key), field, data)?;
        Ok(())
    }
    pub fn hset_proto_nx<T: Message + Default>(
        &mut self,
        key: &str,
        field: String,
        value: T,
    ) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_proto_msg(Some(value))?;
        conn.hset_nx(self.get_key(key), field, data)?;
        Ok(())
    }
    pub fn hget_proto<T: Message + Default>(&mut self, key: &str, field: String) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.hget(self.get_key(key), field)?;
        decode_proto_msg(data)
    }

    pub fn hdel<F: ToRedisArgs>(&mut self, key: &str, fields: F) -> Result<()> {
        let mut conn = self.get_conn()?;
        conn.hdel(self.get_key(key), fields)?;
        Ok(())
    }
    pub fn hexists(&mut self, key: &str, field: String) -> bool {
        match self.get_conn() {
            Ok(mut conn) => match conn.hexists(self.get_key(key), field) {
                Ok(v) => v,
                Err(_) => false,
            },
            Err(err) => {
                error!("get redis conn error: {}", err);
                false
            }
        }
    }
    pub fn hincr<D: ToRedisArgs>(&mut self, key: &str, field: String, incr: D) -> Result<()> {
        let mut conn = self.get_conn()?;
        conn.hincr(self.get_key(key), field, incr)?;
        Ok(())
    }
    pub fn lpush<T: Serialize + DeserializeOwned>(&mut self, key: &str, value: T) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_json_msg(Some(value))?;
        conn.lpush(self.get_key(key), data)?;
        Ok(())
    }

    pub fn lpop<T: Serialize + DeserializeOwned>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.lpop(self.get_key(key), None)?;
        decode_json_msg(data)
    }

    pub fn lpop_proto<T: Message + Default>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.lpop(self.get_key(key), None)?;
        decode_proto_msg(data)
    }

    pub fn blpop<T: Serialize + DeserializeOwned>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data: Vec<Vec<u8>> = conn.blpop(self.get_key(key), self.dial_timeout as usize)?;
        let value = data.get(1).ok_or(anyhow!("Empty"))?.clone();
        decode_json_msg(value)
    }

    pub fn blpop_proto<T: Message + Default>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data: Vec<Vec<u8>> = conn.blpop(self.get_key(key), self.dial_timeout as usize)?;
        let value = data.get(1).ok_or(anyhow!("Empty"))?.clone();
        decode_proto_msg(value)
    }

    pub fn rpush<T: Serialize + DeserializeOwned>(&mut self, key: &str, value: T) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_json_msg(Some(value))?;
        conn.rpush(self.get_key(key), data)?;
        Ok(())
    }

    pub fn rpop<T: Serialize + DeserializeOwned>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.rpop(self.get_key(key), None)?;
        decode_json_msg(data)
    }

    pub fn rpop_proto<T: Message + Default>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.rpop(self.get_key(key), None)?;
        decode_proto_msg(data)
    }

    pub fn brpop<T: Serialize + DeserializeOwned>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data: Vec<Vec<u8>> = conn.brpop(self.get_key(key), self.dial_timeout as usize)?;
        let value = data.get(1).ok_or(anyhow!("Empty"))?.clone();
        decode_json_msg(value)
    }
    pub fn brpop_proto<T: Message + Default>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data: Vec<Vec<u8>> = conn.brpop(self.get_key(key), self.dial_timeout as usize)?;
        let value = data.get(1).ok_or(anyhow!("Empty"))?.clone();
        decode_proto_msg(value)
    }

    pub fn scard(&mut self, key: &str) -> Result<i64> {
        let mut conn = self.get_conn()?;
        let num: i64 = conn.scard(self.get_key(key))?;
        Ok(num)
    }

    pub fn sadd<T: Serialize + DeserializeOwned>(&mut self, key: &str, value: T) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_json_msg(Some(value))?;
        conn.sadd(self.get_key(key), data)?;
        Ok(())
    }

    pub fn sadd_proto<T: Message + Default>(&mut self, key: &str, value: T) -> Result<()> {
        let mut conn = self.get_conn()?;
        let data = encode_proto_msg(Some(value))?;
        conn.sadd(self.get_key(key), data)?;
        Ok(())
    }

    pub fn spop<T: Serialize + DeserializeOwned>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.spop(self.get_key(key))?;
        decode_json_msg(data)
    }

    pub fn spop_proto<T: Message + Default>(&mut self, key: &str) -> Result<T> {
        let mut conn = self.get_conn()?;
        let data = conn.spop(self.get_key(key))?;
        decode_proto_msg(data)
    }

    pub fn spop_n<T: Serialize + DeserializeOwned>(
        &mut self,
        key: &str,
        num: i64,
    ) -> Result<Vec<T>> {
        let mut conn = self.get_conn()?;
        let mut list = Vec::new();
        for _ in 0..num {
            let data = conn.spop(self.get_key(key))?;
            list.push(decode_json_msg(data)?);
        }
        Ok(list)
    }

    pub fn spop_n_proto<T: Message + Default>(&mut self, key: &str, num: i64) -> Result<Vec<T>> {
        let mut conn = self.get_conn()?;
        let mut list = Vec::new();
        for _ in 0..num {
            let data = conn.spop(self.get_key(key))?;
            list.push(decode_proto_msg(data)?);
        }
        Ok(list)
    }

    pub fn lock(&self, resource: &str, ttl: usize) -> Result<Lock> {
        let key = self.get_key(resource);
        loop {
            match self.redlock.lock(key.as_bytes(), ttl) {
                Ok(Some(l)) => {
                    return Ok(l);
                }
                Ok(None) => {
                    continue;
                }
                Err(e) => {
                    return Err(anyhow!("lock err:{}", e.to_string()));
                }
            }
        }
    }

    pub fn unlock(&self, lock: &Lock) {
        self.redlock.unlock(lock)
    }
}
